Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push the runtime filter from HashJoin down to SeqScan or AM. #724

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

zhangyue-hashdata
Copy link
Contributor

+----------+ AttrFilter +------+ ScanKey +------------+
| HashJoin | ------------> | Hash | ---------> | SeqScan/AM |
+----------+ +------+ +------------+

If "gp_enable_runtime_filter_pushdown" is on, three steps will be run:

Step 1. In ExecInitHashJoin(), try to find the mapper between the var in
hashclauses and the var in SeqScan. If found we will save the mapper in
AttrFilter and push them to Hash node;

Step 2. We will create the range/bloom filters in AttrFilter during building
hash table, and these filters will be converted to the list of ScanKey
and pushed down to Seqscan when the building finishes;

Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed
down to the AM module further, otherwise will be used to filter slot in
Seqscan;

perf:
CPU E5-2680 v2 10 cores, memory 32GB, 3 segments

  1. tpcds 10s off: 865s on: 716s 17%
  2. tpcds 100s off: 4592s on: 3751s 18%

Fixes #ISSUE_Number

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

⚠️ To skip CI: Add [skip ci] to your PR title. Only use when necessary! ⚠️



/* append new runtime filters to target node */
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
sss->filters = list_concat(sss->filters, scankeys);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge filter here on the same attno ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Combining Bloom filters will result in a higher False Positive Rate (FPR) compared to using each of the individual Bloom filters separately, so it is not recommended;
  2. There is the same problem to combine range filters like combining Bloom filters;
  3. There is only one Bloom filter and one range filter on the same attribute in many cases;

Copy link
Member

@yjhjstz yjhjstz Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create table t1(a int, b int) with(parallel_workers=2);
create table rt1(a int, b int) with(parallel_workers=2);
create table rt2(a int, b int);
create table rt3(a int, b int);
insert into t1 select i, i from generate_series(1, 100000) i;
insert into t1 select i, i+1 from generate_series(1, 10) i;
insert into rt1 select i, i+1 from generate_series(1, 10) i;
insert into rt2 select i, i+1 from generate_series(1, 10000) i;
insert into rt3 select i, i+1 from generate_series(1, 10) i;
analyze t1;
analyze rt1;
analyze rt2;
analyze rt3;

explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;

postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;
                                   QUERY PLAN                                   
--------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=2.45..428.51 rows=17 width=24)
   ->  Hash Join  (cost=2.45..428.29 rows=6 width=24)
         Hash Cond: (t1.b = rt1.a)
         ->  Hash Join  (cost=1.23..427.00 rows=6 width=16)
               Hash Cond: (t1.b = rt3.a)
               ->  Seq Scan on t1  (cost=0.00..342.37 rows=33337 width=8)
               ->  Hash  (cost=1.10..1.10 rows=10 width=8)
                     ->  Seq Scan on rt3  (cost=0.00..1.10 rows=10 width=8)
         ->  Hash  (cost=1.10..1.10 rows=10 width=8)
               ->  Seq Scan on rt1  (cost=0.00..1.10 rows=10 width=8)
 Optimizer: Postgres query optimizer
(11 rows)

you can try this case, will got two range filters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

continue;

val = slot_getattr(slot, sk->sk_attno, &isnull);
if (isnull)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE TABLE distinct_1(a int);
CREATE TABLE distinct_2(a int);
INSERT INTO distinct_1 VALUES(1),(2),(NULL);
INSERT INTO distinct_2 VALUES(1),(NULL);
SELECT * FROM distinct_1, distinct_2 WHERE distinct_1.a IS NOT DISTINCT FROM distinct_2.a;

test got wrong result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

return slot;

if (node->filter_in_seqscan && node->filters &&
!PassByBloomFilter(node, slot))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tpcds 1TB, bloom filter will lose efficacy or create failed due to large rows ?

Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets.
-> Seq Scan on t1 (actual rows=1 loops=1)
-> Hash (actual rows=32 loops=1)
Buckets: 524288 Batches: 1 Memory Usage: 4098kB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to debug and get pushdown scankey here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case should cover that the hash join node or result node is the child of the parent hash join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to debug and get pushdown scankey here ?

Add debug message to dump the stats about how many tuples are filter out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add debug message in 274d8aa

static void CreateRuntimeFilter(HashJoinState* hjstate);
static bool IsEqualOp(Expr *expr);
static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno);
static PlanState *FindTargetAttr(HashJoinState *hjstate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks FindTargetNode is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, i will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

{
match = false;

if (!IsA(lfirst(lc), Var))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it support other expression, whose one arg is the column attr, and the other is a const?

@fanfuxiaoran
Copy link
Contributor

Looks interesting. And I have some questions to discuss.

  • Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan.
  • Looks only when the hashjoin node and seqscan node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated.

* result (hash filter)
* seqscan on t1, t1 is replicated table
*/
if (!IsA(child, HashJoinState) && !IsA(child, ResultState))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash Join  (cost=0.00..4019.55 rows=37 width=9) (actual time=3203.012..9927.435 rows=1399 loops=1)
                                                   Hash Cond: (web_sales_1_prt_2.ws_item_sk = item.i_item_sk)
                                                   Join Filter: (web_sales_1_prt_2.ws_ext_discount_amt > ((1.3 * avg(web_sales_1_prt_2_1.ws_ext_discount_amt))))
                                                   Rows Removed by Join Filter: 4763
                                                   Extra Text: (seg2)   Hash chain length 1.0 avg, 1 max, using 198 of 2097152 buckets.
                                                   ->  Append  (cost=0.00..676.44 rows=2399189 width=13) (actual time=16.899..5572.473 rows=3090021 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_2  (cost=0.00..676.44 rows=2399189 width=13) (actual time=16.895..1138.267 rows=662
149 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_3  (cost=0.00..676.44 rows=2399189 width=13) (actual time=8.947..1102.409 rows=6621
36 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_4  (cost=0.00..676.44 rows=2399189 width=13) (actual time=8.822..1100.839 rows=6621
48 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_5  (cost=0.00..676.44 rows=2399189 width=13) (actual time=11.391..1083.785 rows=662
179 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_6  (cost=0.00..676.44 rows=2399189 width=13) (actual time=13.030..649.141 rows=4414
09 loops=1)
                                                         ->  Seq Scan on web_sales_1_prt_7  (cost=0.00..676.44 rows=2399189 width=13) (never executed)
                                                         ->  Seq Scan on web_sales_1_prt_others  (cost=0.00..676.44 rows=2399189 width=13) (actual time=1.213..3.203 rows=17
88 loops=1)
                                                   ->  Hash  (cost=2432.09..2432.09 rows=109 width=12) (actual time=3177.768..3177.770 rows=198 loops=1)
                                                         Buckets: 2097152  Batches: 1  Memory Usage: 16392kB
                                                         ->  Broadcast Motion 3:3  (slice3; segments: 3)  (cost=

need to consider partitioned table .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support partitioned table in c701092

Comment on lines 4285 to 4286
attr_filter->min = LLONG_MAX;
attr_filter->max = LLONG_MIN;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLONG_MAX, LLONG_MIN are platform-spec value, i.e. the bound value for unsigned long long, which may not be exactly the same width as Datum. For safety, static assert could be considered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see StaticAssertDecl(SIZEOF_DATUM == 8, "sizeof datum is not 8"); in postgres.h, so it's better to use INT64_MAX/INT64_MIN here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use LONG_MAX, LONG_MIN instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

Comment on lines +2194 to +2196
/*
* Only applicatable for inner, right and semi join,
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give a little more explain about why these join types are supported and others are not?

Comment on lines 2283 to 2284
if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 lines duplicate with the following if-elseif-else code, could be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

Comment on lines 2302 to 2315
break;

var = lfirst(lc);
if (var->varno == INNER_VAR)
*rattno = var->varattno;
else if (var->varno == OUTER_VAR)
*lattno = var->varattno;
else
break;

match = true;
}

return match;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match flag gets the code hard(several modifications) to read. The break statement could be replaced by return false;. If the foreach loop ends, all conditions match, so returns true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the more intuitive way to refactor the code, like below

/* check the first arg */
...

/* check the second arg */
...

return true;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

Comment on lines 106 to 107
if (TupIsNull(slot))
return slot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that slot is never NULL here, so Assert(!TupIsNull(slot)); is better or remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix with 99eabb2

/*
* SK_EMPYT means the end of the array of the ScanKey
*/
sk[*num].sk_flags = SK_EMPYT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to check the boundary of the ScanKey array in rescan? In normal rescan, the number of ScanKeys is the same as begin_scan. If the number of ScanKeys is larger in rescan than that in begin_scan, the boundary value might be invalid and dangerous to access.

+----------+  AttrFilter   +------+  ScanKey   +------------+
| HashJoin | ------------> | Hash | ---------> | SeqScan/AM |
+----------+               +------+            +------------+

If "gp_enable_runtime_filter_pushdown" is on, three steps will be run:

Step 1. In ExecInitHashJoin(), try to find the mapper between the var in
        hashclauses and the var in SeqScan. If found we will save the mapper in
        AttrFilter and push them to Hash node;

Step 2. We will create the range/bloom filters in AttrFilter during building
        hash table, and these filters will be converted to the list of ScanKey
        and pushed down to Seqscan when the building finishes;

Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed
        down to the AM module further, otherwise will be used to filter slot in
        Seqscan;
return slot;

if (node->filter_in_seqscan && node->filters &&
!PassByBloomFilter(node, slot))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's difference with gp_enable_runtime_filter's bloom filter ?

@avamingli
Copy link
Contributor

There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter.

@zhangyue-hashdata
Copy link
Contributor Author

There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter.

got it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants